使用 Kafka 原生的 API ①消费者自动提交
定义自己的生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; public class MyKafkaProducer { private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer; public MyKafkaProducer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 设置批量发送 properties.put("batch.size", 16384); // 批量发送的等待时间50ms, 超过50ms, 不足批量大小也发送 properties.put("linger.ms", 50); this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties); } public boolean sendMsg() { boolean result = true; try { // 正常发送, test2是topic, 0代表的是分区, 1代表的是key, hello world是发送的消息内容 final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world"); producer.send(record); // 有回调函数的调用 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println(recordMetadata.topic()); System.out.println(recordMetadata.partition()); System.out.println(recordMetadata.offset()); } }); // 自己定义一个类 producer.send(record, new MyCallback(record)); } catch (Exception e) { result = false; } return result; } }
定义生产者发送成功的回调函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; public class MyCallback implements Callback { private Object msg; public MyCallback(Object msg) { this.msg = msg; } @Override public void onCompletion(RecordMetadata metadata, Exception e) { System.out.println("topic = " + metadata.topic()); System.out.println("partiton = " + metadata.partition()); System.out.println("offset = " + metadata.offset()); System.out.println(msg); } }
生产者测试类:在生产者测试类中,自己遇到一个坑,就是最后自己没有加 sleep,就是怎么检查自己的代码都没有问题,但是最后就是没法发送成功消息,最后加了一个 sleep 就可以了。
因为主函数 main 已经执行完退出,但是消息并没有发送完成,需要进行等待一下。当然,你在生产环境中可能不会遇到这样问题,呵呵!
代码如下:
1 2 3 4 5 6 7 8 9 10 11 import static java.lang.Thread.sleep; public class MyKafkaProducerTest { public static void main(String[] args) throws InterruptedException { MyKafkaProducer producer = new MyKafkaProducer(); boolean result = producer.sendMsg(); System.out.println("send msg " + result); sleep(1000); } }
消费者类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; public class MyKafkaConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public MyKafkaConsumer() { super("KafkaConsumerTest", false); Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("group.id", "mygroup"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("heartbeat.interval.ms", "10000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { consumer.subscribe(Arrays.asList("test2")); ConsumerRecords<Integer, String>records = consumer.poll(1000); for (ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); } } }
消费者的测试类:
1 2 3 4 5 6 7 8 public class MyConsumerTest { public static void main(String[] args) { MyKafkaConsumer consumer = new MyKafkaConsumer(); consumer.start(); System.out.println("=================="); } }
②消费者同步手动提交
前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的,但自动提交 可能会出现消息重复消费的情况。
所以在生产环境下,很多时候需要对 Offset 进行手动提交, 以解决重复消费的问题。
手动提交又可以划分为同步提交、异步提交,同异步联合提交。这些提交方式仅仅是 doWork() 方法不相同,其构造器是相同的。
所以下面首先在前面消费者类的基础上进行构造器的修改,然后再分别实现三种不同的提交方式。
同步提交方式是,消费者向 Broker 提交 Offset 后等待 Broker 成功响应。若没有收到响应,则会重新提交,直到获取到响应。
而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。
修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; public class MyKafkaConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public MyKafkaConsumer() { super("KafkaConsumerTest", false); Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("group.id", "mygroup"); // 这里要修改成手动提交 properties.put("enable.auto.commit", "false"); // properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("heartbeat.interval.ms", "10000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { consumer.subscribe(Arrays.asList("test2")); ConsumerRecords<Integer, String>records = consumer.poll(1000); for (ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); //手动同步提交 consumer.commitSync(); } } }
③消费者异步手工提交
手动同步提交方式需要等待 Broker 的成功响应,效率太低,影响消费者的吞吐量。
异步提交方式是,消费者向 Broker 提交 Offset 后不用等待成功响应,所以其增加了消费者的吞吐量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; public class MyKafkaConsumer extends ShutdownableThread { private KafkaConsumer<Integer, String> consumer; public MyKafkaConsumer() { super("KafkaConsumerTest", false); Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); properties.put("group.id", "mygroup"); // 这里要修改成手动提交 properties.put("enable.auto.commit", "false"); // properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("heartbeat.interval.ms", "10000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } @Override public void doWork() { consumer.subscribe(Arrays.asList("test2")); ConsumerRecords<Integer, String>records = consumer.poll(1000); for (ConsumerRecord record : records) { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); //手动同步提交 // consumer.commitSync(); //手动异步提交 // consumer.commitAsync(); // 带回调公共的手动异步提交 consumer.commitAsync((offsets, e) -> { if(e != null) { System.out.println("提交次数, offsets = " + offsets); System.out.println("exception = " + e); } }); } } }
Spring Boot 使用 Kafka 现在大家的开发过程中,很多都用的是 Spring Boot 的项目,直接启动了,如果还是用原生的 API,就是有点 Low 了啊,那 Kafka 是如何和 Spring Boot 进行联合的呢?
maven 配置:
1 2 3 4 5 6 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.1</version> </dependency>
添加配置文件,在 application.properties 中加入如下配置信息:
Kafka 连接地址:
1 spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
生产者:
1 2 3 4 5 6 7 spring.kafka.producer.acks = 0 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.retries = 3 spring.kafka.producer.batch-size = 4096 spring.kafka.producer.buffer-memory = 33554432 spring.kafka.producer.compression-type = gzip
消费者:
1 2 3 4 5 6 7 8 9 10 11 spring.kafka.consumer.group-id = mygroup spring.kafka.consumer.auto-commit-interval = 5000 spring.kafka.consumer.heartbeat-interval = 3000 spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.auto-offset-reset = earliest spring.kafka.consumer.enable-auto-commit = true # listenner, 标识消费者监听的个数 spring.kafka.listener.concurrency = 8 # topic的名字 kafka.topic1 = topic1
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; @Service @Slf4j public class MyKafkaProducerServiceImpl implements MyKafkaProducerService { @Resource private KafkaTemplate<String, String> kafkaTemplate; // 读取配置文件 @Value("${kafka.topic1}") private String topic; @Override public void sendKafka() { kafkaTemplate.send(topic, "hell world"); } }
消费者:
1 2 3 4 5 6 7 8 9 10 @Component @Slf4j public class MyKafkaConsumer { @KafkaListener(topics = "${kafka.topic1}") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { log.info("----------------- record =" + record); log.info("------------------ message =" + kafkaMessage.get()); }